/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.metrics2.sink.kafka;

import java.io.Closeable;
import java.util.Locale;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.ConsoleReporter;
import com.yammer.metrics.reporting.CsvReporter;

/**
 * The abstract base class for all scheduled reporters (i.e., reporters which
 * process a registry's metrics periodically).
 *
 * @see ConsoleReporter
 * @see CsvReporter
 * @see Slf4jReporter
 */
public abstract class ScheduledReporter implements Closeable {

  private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class);

  /**
   * A simple named thread factory.
   */
  @SuppressWarnings("NullableProblems")
  private static class NamedThreadFactory implements ThreadFactory {
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    private NamedThreadFactory(String name) {
      final SecurityManager s = System.getSecurityManager();
      this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
      this.namePrefix = "metrics-" + name + "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
      final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
      t.setDaemon(true);
      if (t.getPriority() != Thread.NORM_PRIORITY) {
        t.setPriority(Thread.NORM_PRIORITY);
      }
      return t;
    }
  }

  private static final AtomicInteger FACTORY_ID = new AtomicInteger();

  private final MetricsRegistry registry;
  private final ScheduledExecutorService executor;
  private final double durationFactor;
  private final String durationUnit;
  private final double rateFactor;
  private final String rateUnit;

  /**
   * Creates a new {@link ScheduledReporter} instance.
   *
   * @param registry
   *          the {@link com.codahale.metrics.MetricRegistry} containing the
   *          metrics this reporter will report
   * @param name
   *          the reporter's name
   * @param rateUnit
   *          a unit of time
   * @param durationUnit
   *          a unit of time
   */
  protected ScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
    this(registry, rateUnit, durationUnit, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-'
        + FACTORY_ID.incrementAndGet())));
  }

  /**
   * Creates a new {@link ScheduledReporter} instance.
   *
   * @param registry
   *          the {@link com.codahale.metrics.MetricRegistry} containing the
   *          metrics this reporter will report
   * @param executor
   *          the executor to use while scheduling reporting of metrics.
   */
  protected ScheduledReporter(MetricsRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit,
      ScheduledExecutorService executor) {
    this.registry = registry;
    this.executor = executor;
    this.rateFactor = rateUnit.toSeconds(1);
    this.rateUnit = calculateRateUnit(rateUnit);
    this.durationFactor = 1.0 / durationUnit.toNanos(1);
    this.durationUnit = durationUnit.toString().toLowerCase(Locale.US);
  }

  /**
   * Starts the reporter polling at the given period.
   *
   * @param period
   *          the amount of time between polls
   * @param unit
   *          the unit for {@code period}
   */
  public void start(long period, TimeUnit unit) {
    executor.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          report();
        } catch (RuntimeException ex) {
          LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this
              .getClass().getSimpleName(), ex);
        }
      }
    }, period, period, unit);
  }

  /**
   * Stops the reporter and shuts down its thread of execution.
   *
   * Uses the shutdown pattern from
   * http://docs.oracle.com/javase/7/docs/api/java
   * /util/concurrent/ExecutorService.html
   */
  public void stop() {
    executor.shutdown(); // Disable new tasks from being submitted
    try {
      // Wait a while for existing tasks to terminate
      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
        executor.shutdownNow(); // Cancel currently executing tasks
        // Wait a while for tasks to respond to being cancelled
        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
          System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
        }
      }
    } catch (InterruptedException ie) {
      // (Re-)Cancel if current thread also interrupted
      executor.shutdownNow();
      // Preserve interrupt status
      Thread.currentThread().interrupt();
    }
  }

  /**
   * Stops the reporter and shuts down its thread of execution.
   */
  @Override
  public void close() {
    stop();
  }

  /**
   * Report the current values of all metrics in the registry.
   */
  public void report() {
    synchronized (this) {
      report(registry.allMetrics().entrySet());
    }
  }

  /**
   * Called periodically by the polling thread. Subclasses should report all the
   * given metrics.
   *
   * @param metrics
   *          all the metrics in the registry
   */
  public abstract void report(Set<Entry<MetricName, Metric>> metrics);

  protected String getRateUnit() {
    return rateUnit;
  }

  protected String getDurationUnit() {
    return durationUnit;
  }

  protected double convertDuration(double duration) {
    return duration * durationFactor;
  }

  protected double convertRate(double rate) {
    return rate * rateFactor;
  }

  private String calculateRateUnit(TimeUnit unit) {
    final String s = unit.toString().toLowerCase(Locale.US);
    return s.substring(0, s.length() - 1);
  }
}
